{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "Getting started -- Tour of Beam",
      "provenance": [],
      "toc_visible": true,
      "include_colab_link": true
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    }
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "view-in-github",
        "colab_type": "text"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/getting-started.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "SSKEd7tP-b2k",
        "cellView": "form"
      },
      "source": [
        "#@title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License."
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "5UC_aGanx6oE"
      },
      "source": [
        "# Getting started: _Tour of Beam_\n",
        "\n",
        "[Apache Beam](https://beam.apache.org/get-started/beam-overview/)\n",
        "is a library for parallel data processing.\n",
        "\n",
        "Beam is commonly used for\n",
        "[Extract-Transform-Load (ETL)](https://en.wikipedia.org/wiki/Extract,_transform,_load)\n",
        "jobs, where we _extract_ data from a data source, _transform_ that data, and _load_ it into a data sink like a database.\n",
        "It does particularly well with large amounts of data since it can use mutliple machines to process everything at the same time.\n",
        "\n",
        "Let's begin by installing the `apache-beam` package with `pip`."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "R_Yhoc6N_Flg"
      },
      "source": [
        "# Install apache-beam with pip.\n",
        "!pip install --quiet apache-beam"
      ],
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "WwxLB5EiVKP_"
      },
      "source": [
        "You can express a _data processing pipeline_, and then run it on the\n",
        "[_runner_ of your choice](https://beam.apache.org/documentation/runners/capability-matrix/).\n",
        "For now, we use the `DirectRunner` which runs locally for simplicity."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8rfQeLk2y9lx"
      },
      "source": [
        "# What is a _pipeline_?\n",
        "\n",
        "A __pipeline__ is a __sequence of data transformations__.\n",
        "You can think of it like a production line,\n",
        "data comes in from one end,\n",
        "it gets transformed by each step.\n",
        "The outputs from one step are passed as inputs to the next step.\n",
        "\n",
        "In Beam, your data lives in a __`PCollection`__,\n",
        "which stands for _Parallel Collection_.\n",
        "A `PCollection` is like a __list of elements__,\n",
        "but without any order guarantees.\n",
        "This allows Beam to easily parallelize and distribute\n",
        "the `PCollection`'s elements."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "81VNY478gep_"
      },
      "source": [
        "![A PCollection is an unordered collection of elements]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LaljqKvs4UU5"
      },
      "source": [
        "Once you have your data, the next step is to transform it.\n",
        "In Beam, you transform data using **`PTransform`**s,\n",
        "which stands for _Parallel Transform_.\n",
        "A `PTransform` is like a __function__,\n",
        "they take some inputs, transform them and create some outputs."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Me_hvd6XgtPC"
      },
      "source": [
        "![A PTransform is a function to transform an element of a PCollection]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "SVk4BsuKVlLj"
      },
      "source": [
        "Now let's dive into creating our first pipeline.\n",
        "\n",
        "For this first pipeline, let's just feed it some data from a Python list and print the results.\n",
        "\n",
        "Each _step_ in the pipeline is delimited by the _pipe operator_ `|`.\n",
        "The outputs of each transform are passed to the next transform as inputs.\n",
        "And we can save the final results into a `PCollection` variable.\n",
        "\n",
        "```py\n",
        "# We pass the elements from step1 through step3 and save the results into `outputs`.\n",
        "outputs = pipeline | step1 | step2 | step3\n",
        "```\n",
        "\n",
        "Pipelines can quickly grow long, so it's sometimes easier to read if we surround them with parentheses and break them into multiple lines.\n",
        "\n",
        "```py\n",
        "# This is equivalent to the example above.\n",
        "outputs = (\n",
        "  pipeline\n",
        "  | step1\n",
        "  | step2\n",
        "  | step3\n",
        ")\n",
        "```\n",
        "\n",
        "Also, Beam expects each transform, or step, to have a unique _label_, or description.\n",
        "This makes it a lot easier to debug, and it's in general a good practice to start.\n",
        "You can use the _right shift operator_ `>>` to add a label to your transforms, like `'My description' >> MyTransform`.\n",
        "\n",
        "```py\n",
        "# Try to give short but descriptive labels.\n",
        "# These serve both as comments and help debug later on.\n",
        "outputs = (\n",
        "  pipeline\n",
        "  | 'First step' >> step1\n",
        "  | 'Second step' >> step2\n",
        "  | 'Third step' >> step3\n",
        ")\n",
        "```\n",
        "\n",
        "> ℹ️&nbsp; The syntax might seem a little different at first, but you'll become familiar with it.\n",
        "\n",
        "We use the `Create` transform to feed the pipeline with an\n",
        "[`iterable`](https://docs.python.org/3/glossary.html#term-iterable)\n",
        "of elements, like a `list`.\n",
        "\n",
        "Let's try to see what happens if we try to `print` a PCollection."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "XAke4d5lV5f5",
        "outputId": "eaf5954c-3a66-4ef7-a258-35797ce0bbb9",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "# Create a pipeline.\n",
        "with beam.Pipeline() as pipeline:\n",
        "  # Feed it some input elements with `Create`.\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create initial values' >> beam.Create(inputs)\n",
        "  )\n",
        "\n",
        "  # `outputs` is a PCollection with our input elements.\n",
        "  # But printing it directly won't show us its contents :(\n",
        "  print(f\"outputs: {outputs}\")"
      ],
      "execution_count": 3,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "outputs: PCollection[[3]: Create initial values/Map(decode).None]\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "h0UUmpwRADqA"
      },
      "source": [
        "> ℹ️ In Beam, you can __NOT__ access the elements from a `PCollection` directly like a Python list.\n",
        "> This means, we can't simply `print` the output `PCollection` to see the elements.\n",
        ">\n",
        "> This is because, depending on the runner,\n",
        "> the `PCollection` elements might live in multiple worker machines.\n",
        "\n",
        "To print the elements in the PCollection, we'll do a little trick, but we'll explain it shortly."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "IblMlbE8_4CJ",
        "outputId": "dc825083-d157-4fae-fff9-4db33dbc0876",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create initial values' >> beam.Create(inputs)\n",
        "  )\n",
        "\n",
        "  # We can only access the elements through another transform.\n",
        "  # Don't worry if you don't know what's happening here,\n",
        "  # we'll get to it just next :)\n",
        "  outputs | beam.Map(print)"
      ],
      "execution_count": 4,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "0\n",
            "1\n",
            "2\n",
            "3\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "FY9TP3Tw5eZt"
      },
      "source": [
        "# Transforming data\n",
        "\n",
        "Apache Beam is designed with a [functional paradigm](https://en.wikipedia.org/wiki/Functional_programming).\n",
        "This means that, instead of _loops_, it uses `PTransform`s alongside with _functions_ to process each element in a `PCollection`.\n",
        "\n",
        "Let's go through some of the most common and basic data transforms in Beam."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UMKgi9LD6-xb"
      },
      "source": [
        "## Map: _one-to-one_\n",
        "\n",
        "Let's say we have some elements and we want to do something with each element.\n",
        "\n",
        "We want to `map` a function to each element of the collection.\n",
        "\n",
        "`map` takes a _function_ that transforms a single input `a` into a single output `b`.\n",
        "\n",
        "> ℹ️ For example, we want to multiply each element by 2."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "tDtzHrSdjQx8"
      },
      "source": [
        "![Transforms each individual element in a collection]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "G0NJrjja9YRF"
      },
      "source": [
        "In Python, this is commonly done with the\n",
        "[built-in `map` function](https://docs.python.org/3/library/functions.html#map), or with\n",
        "[list comprehensions](https://docs.python.org/3/tutorial/datastructures.html#list-comprehensions)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "MHzR7CuZqcQq",
        "outputId": "295e24fb-506e-4080-ee04-755ae533888b",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "# Using the `map` function.\n",
        "outputs = map(lambda x: x * 2, inputs)\n",
        "print(list(outputs))\n",
        "\n",
        "# Using a list comprehension.\n",
        "outputs = [x * 2 for x in inputs]\n",
        "print(outputs)\n",
        "\n",
        "# Roughly equivalent for loop.\n",
        "outputs = []\n",
        "for x in inputs:\n",
        "  outputs.append(x * 2)\n",
        "print(outputs)"
      ],
      "execution_count": 5,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "[0, 2, 4, 6]\n",
            "[0, 2, 4, 6]\n",
            "[0, 2, 4, 6]\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "4vaBuNFW-5FG"
      },
      "source": [
        "In Beam, there is the\n",
        "[`Map` transform](https://beam.apache.org/documentation/transforms/python/elementwise/map/),\n",
        "but we must use it within a pipeline.\n",
        "\n",
        "First we create a pipeline and feed it our input elements.\n",
        "Then we _pipe_ those elements into a `Map` transform where we apply our function."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "LoYq871Q96iu",
        "outputId": "57c2155f-cebb-42ae-e175-e1554f2f806b",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create values' >> beam.Create(inputs)\n",
        "      | 'Multiply by 2' >> beam.Map(lambda x: x * 2)\n",
        "  )\n",
        "\n",
        "  outputs | beam.Map(print)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "0\n",
            "2\n",
            "4\n",
            "6\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Q06jvwmqOzer"
      },
      "source": [
        "> ℹ️ Now that we know how `Map` works, we can see what's happening when we print the elements.\n",
        ">\n",
        "> We have our outputs stored in the `outputs` `PCollection`, so we _pipe_ it to a `Map` transform to apply the\n",
        "> [`print`](https://docs.python.org/3/library/functions.html#print)\n",
        "> function.\n",
        ">\n",
        "> Note that `print` returns `None`, so we get an output `PCollection` of all `None` elements.\n",
        "> But we are not saving its results to any variable,\n",
        "> so they get discarded.\n",
        ">\n",
        "> This does _not_ affect the values in `outputs` in any way."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "m8AVwzVyCAPD"
      },
      "source": [
        "## FlatMap: _one-to-many_\n",
        "\n",
        "`Map` allows us to transform each individual element,\n",
        "but we can't change the number of elements with it.\n",
        "\n",
        "We want to `map` a function to each element of a collection.\n",
        "That function returns a _list of output elements_,\n",
        "so we would get a _list of lists of elements_.\n",
        "Then we want to _flatten_ the _list of lists_ into a single _list_.\n",
        "\n",
        "`flatMap` takes a function that transforms a single input `a` into an `iterable` of outputs `b`.\n",
        "But we get a _single collection_ containing the outputs of _all_ the elements.\n",
        "\n",
        "> ℹ️ For example, we want to have as many elements as the element's value.\n",
        "> For a value `1` we want one element, and three elements for a value `3`."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "t3KpC-tej1CS"
      },
      "source": [
        "![Expands each element into zero or more elements, and flattens all the elements into a single output collection]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QdQEEbtNKpQJ"
      },
      "source": [
        "In Python this could be done with a _nested list comprehension_,\n",
        "but it's a little tricky to read."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "0xqSUP5mKNqI",
        "outputId": "58251379-ed6f-4452-e9ea-60e59394ee5a",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "# Using a list comprehensions.\n",
        "mapOutputs = [[x for _ in range(x)] for x in inputs]\n",
        "# After the map function, flatten the results.\n",
        "outputs = [x for xs in mapOutputs for x in xs]\n",
        "print(outputs)\n",
        "\n",
        "# Roughly equivalent for loop.\n",
        "outputs = []\n",
        "for x in inputs:\n",
        "  outputs += [x for _ in range(x)]\n",
        "print(outputs)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "[1, 2, 2, 3, 3, 3]\n",
            "[1, 2, 2, 3, 3, 3]\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OP3bSQoYMOH6"
      },
      "source": [
        "The good news is that Beam already has a\n",
        "[`FlatMap` transform](https://beam.apache.org/documentation/transforms/python/elementwise/flatmap/)\n",
        "built-in, so it's actually easier than plain Python.\n",
        "\n",
        "`FlatMap` accepts a function that takes a single input element and outputs an `iterable` of elements."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "gZbsyAWaMT-F",
        "outputId": "f2623c5c-d709-4229-c293-952b9bbcbef4",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create values' >> beam.Create(inputs)\n",
        "      | 'Expand elements' >> beam.FlatMap(lambda x: [x for _ in range(x)])\n",
        "  )\n",
        "\n",
        "  outputs | beam.Map(print)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "1\n",
            "2\n",
            "2\n",
            "3\n",
            "3\n",
            "3\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YHcWp18cMefc"
      },
      "source": [
        "> ℹ️ Try replacing the `FlatMap` transform with `Map` to see how they behave differently."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ad-O7lUxOoyD"
      },
      "source": [
        "## Filter: _one-to-zero_\n",
        "\n",
        "Sometimes we want to *only* process certain elements while ignoring others.\n",
        "\n",
        "We want to `filter` each element in a collection using a function.\n",
        "\n",
        "`filter` takes a function that checks a single element `a`,\n",
        "and returns `True` to keep the element, or `False` to discard it.\n",
        "\n",
        "> ℹ️ For example, we only want to keep number that are *even*, or divisible by two.\n",
        "> We can use the\n",
        "> [modulo operator `%`](https://en.wikipedia.org/wiki/Modulo_operation)\n",
        "> for a simple check."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "CUVXfvixkckY"
      },
      "source": [
        "![Filters out elements that meet a given condition]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Y-gWtFNockwq"
      },
      "source": [
        "In Python we can do this with *list comprehensions* as well."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "4xXSy5Bga9td",
        "outputId": "90d5d9a7-d925-4b11-91c8-e6e844f9d21e",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "# Using a list comprehensions.\n",
        "outputs = [x for x in inputs if x % 2 == 0]\n",
        "print(outputs)\n",
        "\n",
        "# Roughly equivalent for loop.\n",
        "outputs = []\n",
        "for x in inputs:\n",
        "  if x % 2 == 0:\n",
        "    outputs.append(x)\n",
        "print(outputs)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "[0, 2]\n",
            "[0, 2]\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "mtVSijNAc0Fi"
      },
      "source": [
        "In Beam, there is the\n",
        "[`Filter` transform](https://beam.apache.org/documentation/transforms/python/elementwise/filter/)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "l7W9UafLkiO1",
        "outputId": "a59e0cb0-3cb9-43a6-8043-5789380d938c",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create values' >> beam.Create(inputs)\n",
        "      | 'Keep only even numbers' >> beam.Filter(lambda x: x % 2 == 0)\n",
        "  )\n",
        "\n",
        "  outputs | beam.Map(print)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "0\n",
            "2\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "_yYNTszgktLY"
      },
      "source": [
        "## Combine: _many-to-one_\n",
        "\n",
        "We also need a way to get a single value from an entire `PCollection`.\n",
        "We might want to get the total number of elements, or the average value, or any other type of _aggregation_ of values.\n",
        "\n",
        "We want to `combine` the elements in a collection into a single output.\n",
        "\n",
        "`combine` takes a function that transforms an `iterable` of inputs `a`, and returns a single output `a`.\n",
        "\n",
        "Other common names for this function are `fold` and `reduce`.\n",
        "\n",
        "> ℹ️ For example, we want to add all numbers together."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Too-Ru1xGj7e"
      },
      "source": [
        "![Combines a collection of elements into a single element through a function]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "F60IfKPOmlkg"
      },
      "source": [
        "In Python this is usually achieved with the\n",
        "[`reduce` function](https://docs.python.org/3/library/functools.html#functools.reduce)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "4qjUZvT2k878",
        "outputId": "1868d167-529e-4e5f-ec27-3ba405226a1d",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "from functools import reduce\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "# Using reduce (most general way).\n",
        "output = reduce(lambda x, y: x + y, inputs, 0)\n",
        "print(output)\n",
        "\n",
        "# Using the built-in sum function, which is itself a \"reduce\" function.\n",
        "output = sum(inputs)\n",
        "print(output)\n",
        "\n",
        "# Roughly equivalent for loop.\n",
        "output = 0\n",
        "for x in inputs:\n",
        "  y = output\n",
        "  output = x + y\n",
        "print(output)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "6\n",
            "6\n",
            "6\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "kotSZnlZoO8D"
      },
      "source": [
        "In Beam, there are\n",
        "[aggregation transforms](https://beam.apache.org/documentation/transforms/python/overview/#aggregation).\n",
        "\n",
        "For this particular example, we can use the\n",
        "[`CombineGlobally` transform](https://beam.apache.org/documentation/transforms/python/aggregation/sum/)\n",
        "which accepts a function that takes an iterable of elements as an input and outputs a single value.\n",
        "\n",
        "We can pass the\n",
        "[built-in function `sum`](https://docs.python.org/3/library/functions.html#sum)\n",
        "into `CombineGlobally`."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "PoRd7hlnoOu5",
        "outputId": "b34f987f-ddb3-4808-95eb-9ac6140d2ea2",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [0, 1, 2, 3]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create values' >> beam.Create(inputs)\n",
        "      | 'Sum all values together' >> beam.CombineGlobally(sum)\n",
        "  )\n",
        "\n",
        "  outputs | beam.Map(print)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "6\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "pFb98ioSp9YU"
      },
      "source": [
        "> ℹ️ There are many ways to combine values in Beam.\n",
        "> You could even combine them into a different data type by defining a custom `CombineFn`.\n",
        ">\n",
        "> You can learn more about them by checking the available\n",
        "> [aggregation transforms](https://beam.apache.org/documentation/transforms/python/overview/#aggregation)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "cOnJoaE3qmMv"
      },
      "source": [
        "## GroupByKey: _group related elements_\n",
        "\n",
        "Sometimes it's useful to pair each element with a *key* that we can use to group related elements together.\n",
        "\n",
        "Think of it as creating a\n",
        "[Python `dict`](https://docs.python.org/3/tutorial/datastructures.html#dictionaries)\n",
        "from a list of `(key, value)` pairs,\n",
        "but instead of replacing the value on a \"duplicate\" key,\n",
        "you would get a list of all the values associated with that key.\n",
        "\n",
        "> ℹ️ For example, we want to group each animal with the list of foods they like, and we start with `(animal, food)` pairs."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "IX9VmTH7xluM"
      },
      "source": [
        "![Groups related key-value pairs by their keys]()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Sic8de3btYli"
      },
      "source": [
        "There's no built-in function for `groupByKey` in plain Python,\n",
        "but here's a simple implementation."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "EzJPZJCWtWF9",
        "outputId": "cd972c46-7c05-49ab-a57d-e6e655349544",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "from functools import reduce\n",
        "\n",
        "inputs = [\n",
        "  ('🐹', '🌽'),\n",
        "  ('🐼', '🎋'),\n",
        "  ('🐰', '🥕'),\n",
        "  ('🐹', '🌰'),\n",
        "  ('🐰', '🥒'),\n",
        "]\n",
        "\n",
        "\n",
        "# Since we're getting a single dict from all the elements,\n",
        "# we can use reduce for this.\n",
        "def groupByKey(result, keyValue):\n",
        "  key, value = keyValue\n",
        "  values = result.get(key, []) + [value]\n",
        "  return {**result, key: values}\n",
        "output = reduce(groupByKey, inputs, {})\n",
        "print(output)\n",
        "\n",
        "\n",
        "# Roughly equivalent for loop.\n",
        "output = {}\n",
        "for key, value in inputs:\n",
        "  values = output.get(key, []) + [value]\n",
        "  output = {**output, key: values}\n",
        "print(output)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "{'🐹': ['🌽', '🌰'], '🐼': ['🎋'], '🐰': ['🥕', '🥒']}\n",
            "{'🐹': ['🌽', '🌰'], '🐼': ['🎋'], '🐰': ['🥕', '🥒']}\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1EmT-9Zawa_q"
      },
      "source": [
        "In Beam, there is the\n",
        "[`GroupByKey` transform](https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/)."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "67fZUiWiwnAf",
        "outputId": "f95c455c-c554-41de-bc8c-e755584064d2",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "source": [
        "import apache_beam as beam\n",
        "\n",
        "inputs = [\n",
        "  ('🐹', '🌽'),\n",
        "  ('🐼', '🎋'),\n",
        "  ('🐰', '🥕'),\n",
        "  ('🐹', '🌰'),\n",
        "  ('🐰', '🥒'),\n",
        "]\n",
        "\n",
        "with beam.Pipeline() as pipeline:\n",
        "  outputs = (\n",
        "      pipeline\n",
        "      | 'Create (animal, food) pairs' >> beam.Create(inputs)\n",
        "      | 'Group foods by animals' >> beam.GroupByKey()\n",
        "  )\n",
        "\n",
        "  outputs | beam.Map(print)"
      ],
      "execution_count": null,
      "outputs": [
        {
          "output_type": "stream",
          "text": [
            "('🐹', ['🌽', '🌰'])\n",
            "('🐼', ['🎋'])\n",
            "('🐰', ['🥕', '🥒'])\n"
          ],
          "name": "stdout"
        }
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "gnoz_mWtxSjW"
      },
      "source": [
        "# What's next?\n",
        "\n",
        "* ![Open in Colab](https://github.com/googlecolab/open_in_colab/raw/main/images/icon16.png)\n",
        "  [Reading and writing data](https://colab.research.google.com/github/apache/beam/blob/master/examples/notebooks/tour-of-beam/reading-and-writing-data.ipynb) --\n",
        "  how to read and write data to and from different data formats. \n",
        "* [Transform catalog](https://beam.apache.org/documentation/transforms/python/overview) --\n",
        "  check out all the available transforms.\n",
        "* [Mobile gaming example](https://beam.apache.org/get-started/mobile-gaming-example) --\n",
        "  learn more about windowing, triggers, and streaming through a complete example pipeline.\n",
        "* [Runners](https://beam.apache.org/documentation/runners/capability-matrix) --\n",
        "  check the available runners, their capabilities, and how to run your pipeline in them."
      ]
    }
  ]
}